/**
 * Project: apollo-base-commons
 * 
 * File Created at 2016年10月25日
 * 
 * Copyright 2015-2016 dx.com Croporation Limited.
 * All rights reserved.
 *
 * This software is the confidential and proprietary information of
 * DongXue software Company. ("Confidential Information").  You shall not
 * disclose such Confidential Information and shall use it only in
 * accordance with the terms of the license agreement you entered into
 * with dx.com.
 */
package com.dx.pf.commons.async.core;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.dx.pf.commons.async.pool.AsyncFutureTask;
import com.dx.pf.commons.async.pool.AsyncPoolCallable;
import com.dx.pf.commons.async.pool.AsyncThreadTaskPool;
import com.dx.pf.commons.async.pool.NamedThreadFactory;
import com.dx.pf.commons.constants.AsyncConstant;
import com.dx.pf.commons.log.Logger;
import com.dx.pf.commons.utils.StringUtil;

/** 
* @ClassName: AsyncExecutor 
* @Description: 异步线程执行器
* @author wuzhenfang(wzfbj2008@163.com)
* @date 2016年10月25日 下午1:48:51 
* @version V1.0 
*/
public class AsyncExecutor {

	private final static Logger logger = Logger.getLogger(AsyncExecutor.class);

	private static AsyncThreadTaskPool pool;

	private static AtomicBoolean isInit = new AtomicBoolean(false);
	private static AtomicBoolean isDestroyed = new AtomicBoolean(false);

	enum HandleMode {
		REJECT, CALLERRUN;
	}

	public static void initPool(Integer corePoolSize, Integer maxPoolSize, Integer maxAcceptCount, String rejectedExecutionHandler, Long keepAliveTime,
			Boolean allowCoreThreadTimeout) {

		if (!isInit.get()) {
			isInit.set(true);
			if (corePoolSize == null || corePoolSize <= 0) {
				corePoolSize = Runtime.getRuntime().availableProcessors() * 4;
			}
			if (maxPoolSize == null || maxPoolSize <= 0) {
				maxPoolSize = corePoolSize;
			}
			if (maxAcceptCount == null || maxAcceptCount < 0) {
				maxAcceptCount = (corePoolSize * 2);
			}
			HandleMode handleMode = HandleMode.CALLERRUN;
			if (!StringUtil.isEmpty(rejectedExecutionHandler)) {
				if ("REJECT".equals(rejectedExecutionHandler)) {
					handleMode = HandleMode.REJECT;
				}
			}
			if (keepAliveTime == null || keepAliveTime < 0) {
				keepAliveTime = AsyncConstant.ASYNC_DEFAULT_KEEPALIVETIME;
			}
			if (allowCoreThreadTimeout == null) {
				allowCoreThreadTimeout = true;
			}

			RejectedExecutionHandler handler = getRejectedHandler(handleMode);
			BlockingQueue<Runnable> queue = createQueue(maxAcceptCount);
			pool = new AsyncThreadTaskPool(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, queue, handler, new NamedThreadFactory());
			pool.getThreadPoolExecutor().allowCoreThreadTimeOut(allowCoreThreadTimeout);
			logger.info("ThreadPoolExecutor initialize info corePoolSize:"+corePoolSize
			+" maxPoolSize:"+maxPoolSize
					+" maxAcceptCount:"+maxAcceptCount
			+" rejectedExecutionHandler:"+handleMode);
		}
	}

	public static <T> AsyncFutureTask<T> submit(AsyncPoolCallable<T> task) {
		return submit(task, null);
	}

	public static <T> AsyncFutureTask<T> submit(AsyncPoolCallable<T> task, AsyncFutureCallback<T> callback) {
		if (!isInit.get()) {
			initPool(null, null, null, null, null, null);
		}
		return pool.submit(task, callback);
	}

	public static void destroy() {
		if (isInit.get() && (pool != null)) {
			pool.destroy();
			pool = null;
		}
	}

	private static BlockingQueue<Runnable> createQueue(int acceptCount) {

		if (acceptCount > 0) {
			return new LinkedBlockingQueue<Runnable>(acceptCount);
		} else {
			return new SynchronousQueue<Runnable>();
		}
	}

	private static RejectedExecutionHandler getRejectedHandler(HandleMode mode) {
		return HandleMode.REJECT == mode ? new ThreadPoolExecutor.AbortPolicy() : new ThreadPoolExecutor.CallerRunsPolicy();
	}

	public static boolean isDestroyed() {
		return isDestroyed.get();
	}

	public static void setIsDestroyed(boolean isDestroyed) {
		AsyncExecutor.isDestroyed.set(true);
	}
}
